package gotail

import (
	"code.simon.com/data-agent/common"
	"code.simon.com/data-agent/conf"
	"context"
	"github.com/hpcloud/tail"
	"github.com/sirupsen/logrus"
	"strings"
	"sync"
)

type TailTask struct {
	Path   string
	Topic  string
	TObj   *tail.Tail
	ctx    context.Context
	Cancel context.CancelFunc
}

type TailTaskMsr struct {
	TailTaskMsrMap map[string]*TailTask
}

var (
	TailMsg TailTaskMsr
	once    sync.Once
)

func GetTailTaskMsg() *TailTaskMsr {
	once.Do(func() {
		TailMsg = TailTaskMsr{
			make(map[string]*TailTask),
		}
	})
	return &TailMsg
}

// InitSource 初始化source
func (TailMsg *TailTaskMsr) InitSource() {
	ctx, cancel := context.WithCancel(context.Background())

	config := tail.Config{
		ReOpen:    true,                                 // 重新打开
		Follow:    true,                                 //是否跟随
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
		MustExist: false,                                // 文件必须存在
		Poll:      true,                                 //  轮询文件更改	}
	}
	for _, item := range strings.Split(conf.ConfigObj.TailConfig.LogFilePath, ",") {
		mix := strings.Split(item, "->")
		var topic string
		if len(mix) == 1 {
			topic = ""
		} else {
			topic = mix[1]
		}
		path := mix[0]
		tt := TailTask{
			Path:   path,
			Topic:  topic,
			ctx:    ctx,
			Cancel: cancel,
		}
		var err error
		tt.TObj, err = tail.TailFile(tt.Path, config)
		common.HandleError(err)
		TailMsg.TailTaskMsrMap[path] = &tt
		go tt.run()
		logrus.Printf("tail %s: init success", tt.Path)
	}
}

// InitTarget 初始化target
func (TailMsg *TailTaskMsr) InitTarget() {
	common.HandleNewError("tail not as target")
}

func (t TailTask) run() {
	// 读取日志发往kafka
	for {
		select {
		case <-t.ctx.Done():
			return
		default:
		}
		// 循环读数据
		lineMsg, ok := <-t.TObj.Lines
		if !ok {
			logrus.Error("tail file close reopen, filename: %s\n", t.TObj.Filename)
			continue
		}
		// 如果是空行就略过
		if len(strings.Replace(lineMsg.Text, "\n", "", -1)) == 0 || len(strings.Replace(lineMsg.Text, "\r", "", -1)) == 0 {
			continue
		}
		// 利用通道将同步的代码改为异步
		msgByteArr := []byte(lineMsg.Text)
		common.MidChannel <- &common.MidChannelModel{
			Key:   t.Topic,
			Value: &msgByteArr,
		}
		logrus.Info(lineMsg.Text)
	}
}
